-
Notifications
You must be signed in to change notification settings - Fork 34
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adaptive rate limiting for OpenSearch bulk requests #1011
Adaptive rate limiting for OpenSearch bulk requests #1011
Conversation
- replace fail safe rate limiter for google guava's - move rate limiter from RestHighLevelClientWrapper to OpenSearchBulkRetryWrapper - add metrics for rate limit (now convert rate from double to int) - add spark conf for rate limit parameters - adjust rate limit based on retryable result percentage Signed-off-by: Sean Kao <[email protected]>
Signed-off-by: Sean Kao <[email protected]>
Signed-off-by: Sean Kao <[email protected]>
flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchBulkRetryWrapper.java
Outdated
Show resolved
Hide resolved
flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java
Outdated
Show resolved
Hide resolved
flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java
Outdated
Show resolved
Hide resolved
Signed-off-by: Sean Kao <[email protected]>
55f6c83
to
beca67f
Compare
Signed-off-by: Sean Kao <[email protected]>
Signed-off-by: Sean Kao <[email protected]>
beca67f
to
c0865ce
Compare
Signed-off-by: Sean Kao <[email protected]>
07b5173
to
43424d7
Compare
Signed-off-by: Sean Kao <[email protected]>
Signed-off-by: Sean Kao <[email protected]>
c20dbf8
to
b6e9b04
Compare
Signed-off-by: Sean Kao <[email protected]>
b6e9b04
to
d55bb13
Compare
Signed-off-by: Sean Kao <[email protected]>
72e8996
to
7c2c734
Compare
Signed-off-by: Sean Kao <[email protected]>
7c2c734
to
fa2a4c3
Compare
@@ -535,7 +535,11 @@ In the index mapping, the `_meta` and `properties`field stores meta and schema i | |||
- `spark.datasource.flint.write.batch_size`: "The number of documents written to Flint in a single batch request. Default value is Integer.MAX_VALUE. | |||
- `spark.datasource.flint.write.batch_bytes`: The approximately amount of data in bytes written to Flint in a single batch request. The actual data write to OpenSearch may more than it. Default value is 1mb. The writing process checks after each document whether the total number of documents (docCount) has reached batch_size or the buffer size has surpassed batch_bytes. If either condition is met, the current batch is flushed and the document count resets to zero. | |||
- `spark.datasource.flint.write.refresh_policy`: default value is false. valid values [NONE(false), IMMEDIATE(true), WAIT_UNTIL(wait_for)] | |||
- `spark.datasource.flint.write.bulkRequestRateLimitPerNode`: [Experimental] Rate limit(request/sec) for bulk request per worker node. Only accept integer value. To reduce the traffic less than 1 req/sec, batch_bytes or batch_size should be reduced. Default value is 0, which disables rate limit. | |||
- `spark.datasource.flint.write.bulk.rate_limit_per_node.enabled`: [Experimental] Enable rate limit for bulk request per worker node. Default is false. | |||
- `spark.datasource.flint.write.bulk.rate_limit_per_node.min`: [Experimental] Lower limit (documents/sec) for adaptive rate limiting for bulk request per worker node, if rate limit enabled. The adaptive rate will not drop below this value. Must be greater than 0. Default is 5000. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure what is the main purpose of having min value? is there any concerns if the rate limiter goes down to zero incase of continuous failures?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For one, min value is used for initial rate limit.
Second is that rate could only increase if there's some successful request, so we can't have 0 rate limit, or else it wouldn't bounce back at all.
Having too small of a rate could also be troublesome, because that means the rate limiter will react slower. For example, say the rate limit becomes 10 docs per sec, however a single request consists of 1000 docs. Then once the 1000 docs request go through, there'll be 100 seconds where other requests couldn't go through (so it satisfy the 10 doc per sec rate). That's 100 seconds of not updating the rate.
- `spark.datasource.flint.write.bulk.rate_limit_per_node.min`: [Experimental] Lower limit (documents/sec) for adaptive rate limiting for bulk request per worker node, if rate limit enabled. The adaptive rate will not drop below this value. Must be greater than 0. Default is 5000. | ||
- `spark.datasource.flint.write.bulk.rate_limit_per_node.max`: [Experimental] Upper limit (documents/sec) for adaptive rate limiting for bulk request per worker node, if rate limit enabled. The adaptive rate will not exceed this value. Set to -1 for no upper bound. Default is 50000. | ||
- `spark.datasource.flint.write.bulk.rate_limit_per_node.increase_step`: [Experimental] Adaptive rate limit increase step for bulk request per worker node, if rate limit enabled. Must be greater than 0. Default is 500. | ||
- `spark.datasource.flint.write.bulk.rate_limit_per_node.decrease_ratio`: [Experimental] Adaptive rate limit decrease ratio for bulk request per worker node, if rate limit enabled. Must be between 0 and 1. Default is 0.8. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason why we are using steps to increase and ratio to decrease? can this be consistent?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this uses the idea of AIMD (additive-increase, multiplicative-decrease) algorithm for TCP congestion control.
Multiple flows using AIMD congestion control will eventually converge to an equal usage of a shared link
I'm not well versed in the mathematics behind this but AIAD (additive-increase additive-decrease), and MIMD (multiplicative-increase multiplicative-decrease) are both said not able to reach stability in dynamic systems
- `spark.datasource.flint.write.bulk.rate_limit_per_node.min`: [Experimental] Lower limit (documents/sec) for adaptive rate limiting for bulk request per worker node, if rate limit enabled. The adaptive rate will not drop below this value. Must be greater than 0. Default is 5000. | ||
- `spark.datasource.flint.write.bulk.rate_limit_per_node.max`: [Experimental] Upper limit (documents/sec) for adaptive rate limiting for bulk request per worker node, if rate limit enabled. The adaptive rate will not exceed this value. Set to -1 for no upper bound. Default is 50000. | ||
- `spark.datasource.flint.write.bulk.rate_limit_per_node.increase_step`: [Experimental] Adaptive rate limit increase step for bulk request per worker node, if rate limit enabled. Must be greater than 0. Default is 500. | ||
- `spark.datasource.flint.write.bulk.rate_limit_per_node.decrease_ratio`: [Experimental] Adaptive rate limit decrease ratio for bulk request per worker node, if rate limit enabled. Must be between 0 and 1. Default is 0.8. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
0.8 ratio looks too aggressive. Do you have some testing to explain why you chose 0.8?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mainly because 0.8^3 ~= 0.5
and TCP congestion control typically uses a decrease rate of 0.5
So if a bulk request attempts 3 times and all fails, we cut the rate limit in half.
/** | ||
* Rate getter and setter are public for testing purpose | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we replace it with @VisibleForTest
? (same for setRate)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
making an interface class. leaving the getter and setter as just part of the interface. restricting it for test doesn't seem necessary
flint-core/src/test/scala/org/opensearch/flint/core/storage/OpenSearchBulkWrapperTest.java
Outdated
Show resolved
Hide resolved
flint-core/src/main/scala/org/opensearch/flint/core/storage/BulkRequestRateLimiter.java
Outdated
Show resolved
Hide resolved
flint-core/src/main/scala/org/opensearch/flint/core/storage/BulkRequestRateLimiter.java
Outdated
Show resolved
Hide resolved
flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchBulkWrapper.java
Show resolved
Hide resolved
- swap parameter for test case asserts - remove excessive null check (create noop impl for rate limiter) Signed-off-by: Sean Kao <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the changes!
Description
Add (client side) adaptive rate limiting for OpenSearch bulk requests. For each successful bulk request, rate limit will increase linearly; for each failure, rate limit will decrease exponentially.
Each worker node adaptively adjust their own rate limit based on responses they get from the OpenSearch cluster. They update their rate limit each time they receive a response.
Retry requests will not be rate limited. They will go through regardless of the rate limit. Only new request will be blocked by rate limiter. An alternative is to have retries also be rate limited. However, this will result in slowing down retries and increasing overall response time, since we do not ensure fairness or priority. Retries could be delayed due to new requests consuming the available rate.
Several new spark properties are added:
spark.datasource.flint.write.bulk.rate_limit_per_node.enabled
: to enable the featurespark.datasource.flint.write.bulk.rate_limit_per_node.min
: min rate limit (documents/sec)spark.datasource.flint.write.bulk.rate_limit_per_node.max
: max rate limit (documents/sec)spark.datasource.flint.write.bulk.rate_limit_per_node.increase_step
: rate limit increase step for successful bulk requestspark.datasource.flint.write.bulk.rate_limit_per_node.decrease_ratio
: rate limit decrease ratio for failed bulk requestThis PR also introduces a new (shaded) dependency:
Test
Next Steps
The rate limit metric would be more useful if they can have more granular dimensions so we can see rate limit per node, or per OpenSearch domain/collection
Related Issues